package rxgo

import (
	"context"
	"reflect"
	"sync"
)

// filter node implementation of streamOperator
type filtersOperater struct {
	opFunc func(ctx context.Context, o *Observable, item reflect.Value, out chan interface{}) (end bool)
}

func (flop filtersOperater) op(ctx context.Context, o *Observable) {
	// must hold defintion of flow resourcs here, such as chan etc., that is allocated when connected
	// this resurces may be changed when operation routine is running.
	in := o.pred.outflow
	out := o.outflow
	//fmt.Println(o.name, "operator in/out chan ", in, out)
	var wg sync.WaitGroup

	go func() {
		end := false
		for x := range in {
			if end {
				continue
			}
			// can not pass a interface as parameter (pointer) to gorountion for it may change its value outside!
			xv := reflect.ValueOf(x)
			// send an error to stream if the flip not accept error
			if e, ok := x.(error); ok && !o.flip_accept_error {
				o.sendToFlow(ctx, e, out)
				continue
			}
			// scheduler
			switch threading := o.threading; threading {
			case ThreadingDefault:
				if flop.opFunc(ctx, o, xv, out) {
					end = true
				}
			case ThreadingIO:
				fallthrough
			case ThreadingComputing:
				wg.Add(1)
				go func() {
					defer wg.Done()
					if flop.opFunc(ctx, o, xv, out) {
						end = true
					}
				}()
			default:
			}
		}

		wg.Wait() //waiting all go-routines completed
		o.closeFlow(out)
	}()
}

// Skip skip specified number of items
func (parent *Observable) Skip(count int) (o *Observable) {
	o = parent.newFilterObservable("skip")
	o.skip = count
	o.index = 0
	o.operator = skipOperator
	return o
}

var skipOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
	skip, stop, e := userNext()
	item := x.Interface()
	o.index++

	if o.index > o.skip {
		if stop {
			end = true
			return
		}
		if skip {
			return
		}
		if e != nil {
			item = e
		}
		// send data
		if !end {
			end = o.sendToFlow(ctx, item, out)
		}
	}

	return
}}

// Last take the last item
func (parent *Observable) Last() (o *Observable) {
	o = parent.newFilterObservable("last")
	o.index = 0
	o.operator = lastOperator
	return o
}

var lastOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
	skip, stop, e := userNext()
	item := x.Interface()
	o.index++
	hasElement := false
	o.pred.ElementAt(o.index).Subscribe(func(res interface{}) {
		hasElement = true
	})

	if !hasElement {
		if stop {
			end = true
			return
		}
		if skip {
			return
		}
		if e != nil {
			item = e
		}
		// send data
		if !end {
			end = o.sendToFlow(ctx, item, out)
		}
	}

	return
}}

// Take take specified number of items
func (parent *Observable) Take(count int) (o *Observable) {
	o = parent.newFilterObservable("take")
	o.take = count
	o.index = 0
	o.operator = takeOperator
	return o
}

var takeOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
	skip, stop, e := userNext()
	item := x.Interface()
	o.index++

	if o.index <= o.take {
		if stop {
			end = true
			return
		}
		if skip {
			return
		}
		if e != nil {
			item = e
		}
		// send data
		if !end {
			end = o.sendToFlow(ctx, item, out)
		}
	}

	return
}}

// TakeLast take last specified number of items
func (parent *Observable) TakeLast(count int) (o *Observable) {
	o = parent.newFilterObservable("takeLast")
	o.take = count
	o.index = 0
	o.operator = takeLastOperator
	return o
}

var takeLastOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
	skip, stop, e := userNext()
	item := x.Interface()
	o.index++

	take := true
	o.pred.ElementAt(o.index + o.take - 1).Subscribe(func(res interface{}) {
		take = false
	})

	if take {
		if stop {
			end = true
			return
		}
		if skip {
			return
		}
		if e != nil {
			item = e
		}
		// send data
		if !end {
			end = o.sendToFlow(ctx, item, out)
		}
	}

	return
}}

// SkipLast take last specified number of items
func (parent *Observable) SkipLast(count int) (o *Observable) {
	o = parent.newFilterObservable("skipLast")
	o.skip = count
	o.index = 0
	o.operator = skipLastOperator
	return o
}

var skipLastOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
	skip, stop, e := userNext()
	item := x.Interface()
	o.index++

	take := false
	o.pred.ElementAt(o.index + o.skip - 1).Subscribe(func(res interface{}) {
		take = true
	})

	if take {
		if stop {
			end = true
			return
		}
		if skip {
			return
		}
		if e != nil {
			item = e
		}
		// send data
		if !end {
			end = o.sendToFlow(ctx, item, out)
		}
	}

	return
}}

// ElementAt take the first item
func (parent *Observable) ElementAt(index int) (o *Observable) {
	o = parent.newFilterObservable("elementAt")
	o.index = 0
	o.take = index
	o.operator = elementAtOperator
	return o
}

var elementAtOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
	skip, stop, e := userNext()
	item := x.Interface()
	o.index++

	if o.index-1 == o.take {
		if stop {
			end = true
			return
		}
		if skip {
			return
		}
		if e != nil {
			item = e
		}
		// send data
		if !end {
			end = o.sendToFlow(ctx, item, out)
		}
	}

	return
}}

// First take the first item
func (parent *Observable) FirstWhile(f interface{}) (o *Observable) {
	// check validation of f
	fv := reflect.ValueOf(f)
	inType := []reflect.Type{typeAny}
	outType := []reflect.Type{typeBool}
	b, ctx_sup := checkFuncUpcast(fv, inType, outType, true)
	if !b {
		panic(ErrFuncFlip)
	}
	o = parent.newFilterObservable("firstWhile")
	o.flip_accept_error = checkFuncAcceptError(fv)

	o.flip_sup_ctx = ctx_sup
	o.flip = fv.Interface()
	o.take = 0
	o.operator = firstWhileOperator
	return o
}

var firstWhileOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {

	fv := reflect.ValueOf(o.flip)
	var params = []reflect.Value{x}
	rs, skip, stop, e := userFuncCall(fv, params)

	var item interface{} = rs[0].Interface()
	if stop {
		end = true
		return
	}
	if skip {
		return
	}
	if e != nil {
		item = e
	}

	// send data
	if !end {
		if b, ok := item.(bool); ok && b && o.take == 0 {
			o.take++
			end = o.sendToFlow(ctx, x.Interface(), out)
		}
	}

	return
}}

// First take the first item
func (parent *Observable) First() (o *Observable) {
	o = parent.newFilterObservable("first")
	o.index = 0
	o.operator = firstOperator
	return o
}

var firstOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
	skip, stop, e := userNext()
	item := x.Interface()
	o.index++

	if o.index == 1 {
		if stop {
			end = true
			return
		}
		if skip {
			return
		}
		if e != nil {
			item = e
		}
		// send data
		if !end {
			end = o.sendToFlow(ctx, item, out)
		}
	}

	return
}}

// Distinct remove duplicated items
func (parent *Observable) Distinct() (o *Observable) {
	o = parent.newFilterObservable("distinct")
	o.operator = distinctOperator
	return o
}

var distinctOperator = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
	skip, stop, e := userNext()
	item := x.Interface()
	if o.hashset == nil {
		o.hashset = make(map[interface{}]bool)
	}
	_, exists := o.hashset[item]

	if !exists {
		o.hashset[item] = true
		if stop {
			end = true
			return
		}
		if skip {
			return
		}
		if e != nil {
			item = e
		}
		// send data
		if !end {
			end = o.sendToFlow(ctx, item, out)
		}
	}

	return
}}

// Filter `func(x anytype) bool` filters items in the original Observable and returns
// a new Observable with the filtered items.
func (parent *Observable) Filter(f interface{}) (o *Observable) {
	// check validation of f
	fv := reflect.ValueOf(f)
	inType := []reflect.Type{typeAny}
	outType := []reflect.Type{typeBool}
	b, ctx_sup := checkFuncUpcast(fv, inType, outType, true)
	if !b {
		panic(ErrFuncFlip)
	}

	o = parent.newFilterObservable("filter")
	o.flip_accept_error = checkFuncAcceptError(fv)

	o.flip_sup_ctx = ctx_sup
	o.flip = fv.Interface()
	o.operator = filterOperater
	return o
}

var filterOperater = filtersOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {

	fv := reflect.ValueOf(o.flip)
	var params = []reflect.Value{x}
	rs, skip, stop, e := userFuncCall(fv, params)

	var item interface{} = rs[0].Interface()
	if stop {
		end = true
		return
	}
	if skip {
		return
	}
	if e != nil {
		item = e
	}
	// send data
	if !end {
		if b, ok := item.(bool); ok && b {
			end = o.sendToFlow(ctx, x.Interface(), out)
		}
	}

	return
}}

func (parent *Observable) newFilterObservable(name string) (o *Observable) {
	//new Observable
	o = newObservable()
	o.Name = name

	//chain Observables
	parent.next = o
	o.pred = parent
	o.root = parent.root

	return o
}
